@Async异步任务与线程池 | 您所在的位置:网站首页 › class level 区别 › @Async异步任务与线程池 |
写在前面:本篇文章是关于使用@Async进行异步任务,并且关于线程池做了一个初步的梳理和总结,包括遇到过的一些坑 在工作中用到的一些线程池以下代码已做脱敏处理 1.newCachedThreadPool private void startTask(List usersList){ ExecutorService executor = Executors.newCachedThreadPool(); executor.submit(()->{ //do someting }); }2.newScheduledThreadPool @Configuration public class ScheduleConfig implements SchedulingConfigurer { @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { //当然了,这里设置的线程池是corePoolSize也是很关键了,自己根据业务需求设定 taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10)); } }如果在idea中安装了阿里规范插件,就会发现上面两种创建线程池的方式都会报红。原因是: 线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors返回的线程池对象的弊端如下: FixedThreadPool和SingleThreadPool: 允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。 CachedThreadPool: 允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。 其实这里CachedThreadPool和newScheduledThreadPool是一样的,都是因为最大线程数被设置成了Integer.MAX_VALUE。 public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }在源码中可以看的出newCachedThreadPool使用的是synchronousqueue队列,也可以看作是一个长度为1的BlockingQueue所以,再加上最大允许线程数为Integer.MAX_VALUE,就导致可能会创建大量线程导致OOM。 同理ScheduledThreadPoolExecutor使用的是DelayedWorkQueue,初始化大小为16。当队列满后就会创建新线程,就导致可能会创建大量线程导致OOM。 我们不妨实际来测试一下,以newCachedThreadPool为例,jvm参数-Xms64m -Xmx192m -Xss1024K -XX:MetaspaceSize=64m -XX:MaxMetaspaceSize=128m。 @PostMapping("/newCachedThreadPoolExample") @ResponseBody public void newCachedThreadPoolExample(){ ExecutorService executorService = Executors.newCachedThreadPool(); while (true){ executorService.submit(()->{ log.info("submit:"+LocalDateTime.now()); try { Thread.sleep(1000); }catch (InterruptedException e){ e.printStackTrace(); } }); } }刚启动时的情况: 请求接口后就开始爆炸
比较尴尬的是一直没有出现报错OOM的情况,就直接卡死了。
以上的线程池虽然可以在外部限制的情况下避免OOM等情况,但是还是建议尽量根据自己的业务情况自定义线程池。 使用@Async快速创建一个异步任务1. application.yml 这里是线程池相关配置,先不详细说,同理可以在代码里面配置config。 线程池缓冲队列的选择以上发生的问题大多数都和线程池的缓冲队列有关,选择一个符合自己业务特点的缓冲队列也十分重要。 2.ThreadpoolApplication 这里需要在 Application上添加 @EnableAsync注解,开启异步任务。如果是选择在代码里面写config,则需要在config文件上添加@EnableAsync注解。 @EnableAsync @SpringBootApplication public class ThreadpoolApplication { public static void main(String[] args) { SpringApplication.run(ThreadpoolApplication.class, args); } }3.AsyncTask 编写一个异步任务处理类,在需要开启异步的方法上面添加@Async @Component @Slf4j public class AsyncTask { @Async public void asyncRun() throws InterruptedException { Thread.sleep(10); log.info(Thread.currentThread().getName()+":处理完成"); } }4.AsyncService 编写一个调用异步方法的service @Service @Slf4j public class AsyncService { @Autowired private AsyncTask asyncTask; public void asyncSimpleExample() { try { log.info("service start"); asyncTask.asyncRun(); log.info("service end"); }catch (InterruptedException e){ e.printStackTrace(); } } }5.AsyncController 编写一个Controller去调用AsyncService /** * @author kurtl */ @Controller @RequestMapping("/") public class AsyncController { @Autowired private AsyncService asyncService; @PostMapping("/asyncSimpleExample") @ResponseBody public void asyncSimpleExample(){ asyncService.asyncSimpleExample(); } }最后请求这个接口 可以看到,先输出了asyncSimpleExample里面打印的service start与service end,表示service方法先执行完毕了,而异步方法则在调用后进行了一个sleep,service没有同步等待sleep完成,而是直接返回,表示这个是异步任务。至此我们已经通过@Async成功创建的异步任务。 关于@Async和@EnableAsync的原理个人觉得源码中很重要的一部分就是源码中的注释,阅读注释也可以帮你快速了解源码的作用等,所有我会把重要的注释稍微翻译一下 1.@Async源码 @Target({ElementType.TYPE, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface Async { /** * A qualifier value for the specified asynchronous operation(s). *May be used to determine the target executor to be used when executing * the asynchronous operation(s), matching the qualifier value (or the bean * name) of a specific {@link java.util.concurrent.Executor Executor} or * {@link org.springframework.core.task.TaskExecutor TaskExecutor} * bean definition. * When specified on a class-level {@code @Async} annotation, indicates that the * given executor should be used for all methods within the class. Method-level use * of {@code Async#value} always overrides any value set at the class level. * @since 3.1.2 */ /** * 在这些注释中有三个非常重要的部分 * 1.使用@Async的方法只能返回Void 或者 Future类型 * 2.表明了@Async是通过org.springframework.core.task.TaskExecutor * 或者java.util.concurrent.Executor来创建线程池 * 3.写了@Async的作用范围 在类上使用@Async会覆盖方法上的@Async */ String value() default ""; } 2.@EnableAsync源码 /** * Enables Spring's asynchronous method execution capability, similar to functionality * found in Spring's {@code } XML namespace. * *To be used together with @{@link Configuration Configuration} classes as follows, * enabling annotation-driven async processing for an entire Spring application context: * * * @Configuration * @EnableAsync * public class AppConfig { * * } * 这里表示需要联合@Configuration注解一起使用,所以@EnableAsync应该 * 添加在线程池Config或者SpringBootApplication 上 * {@code MyAsyncBean} is a user-defined type with one or more methods annotated with * either Spring's {@code @Async} annotation, the EJB 3.1 {@code @javax.ejb.Asynchronous} * annotation, or any custom annotation specified via the {@link #annotation} attribute. * The aspect is added transparently for any registered bean, for instance via this * configuration: * * * @Configuration * public class AnotherAppConfig { * * @Bean * public MyAsyncBean asyncBean() { * return new MyAsyncBean(); * } * } * * By default, Spring will be searching for an associated thread pool definition: * either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context, * or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise. If * neither of the two is resolvable, a {@link org.springframework.core.task.SimpleAsyncTaskExecutor} * 默认情况下spring会先搜索TaskExecutor类型的bean或者名字为 * taskExecutor的Executor类型的bean,都不存在使用 * SimpleAsyncTaskExecutor执行器但是这个SimpleAsyncTaskExecutor实际 * 上是有很大的坑的,建议是自定义一个线程池,这个后面会说 * will be used to process async method invocations. Besides, annotated methods having * * @author Chris Beams * @author Juergen Hoeller * @author Stephane Nicoll * @author Sam Brannen * @since 3.1 * @see Async * @see AsyncConfigurer * @see AsyncConfigurationSelector */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(AsyncConfigurationSelector.class) public @interface EnableAsync { /** * Indicate the 'async' annotation type to be detected at either class * or method level. * By default, both Spring's @{@link Async} annotation and the EJB 3.1 * {@code @javax.ejb.Asynchronous} annotation will be detected. * This attribute exists so that developers can provide their own * custom annotation type to indicate that a method (or all methods of * a given class) should be invoked asynchronously. */ Class submit(Runnable task) { FutureTask future = new FutureTask(task, null); execute(future, TIMEOUT_INDEFINITE); return future; } @Override public Future submit(Callable task) { FutureTask future = new FutureTask(task); execute(future, TIMEOUT_INDEFINITE); return future; } @Override public ListenableFuture submitListenable(Runnable task) { ListenableFutureTask future = new ListenableFutureTask(task, null); execute(future, TIMEOUT_INDEFINITE); return future; } @Override public ListenableFuture submitListenable(Callable task) { ListenableFutureTask future = new ListenableFutureTask(task); execute(future, TIMEOUT_INDEFINITE); return future; } /** * Template method for the actual execution of a task. * The default implementation creates a new Thread and starts it. * @param task the Runnable to execute * @see #setThreadFactory * @see #createThread * @see java.lang.Thread#start() */ //判断是否有工厂,没有的话调用父类创建线程 protected void doExecute(Runnable task) { Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task)); thread.start(); } /** * Subclass of the general ConcurrencyThrottleSupport class, * making {@code beforeAccess()} and {@code afterAccess()} * visible to the surrounding class. */ private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport { @Override protected void beforeAccess() { super.beforeAccess(); } @Override protected void afterAccess() { super.afterAccess(); } } /** * This Runnable calls {@code afterAccess()} after the * target Runnable has finished its execution. */ private class ConcurrencyThrottlingRunnable implements Runnable { private final Runnable target; public ConcurrencyThrottlingRunnable(Runnable target) { this.target = target; } @Override public void run() { try { this.target.run(); } finally { concurrencyThrottle.afterAccess(); } } } } 最主要的就是这段代码 /** * Template method for the actual execution of a task. *The default implementation creates a new Thread and starts it. * @param task the Runnable to execute * @see #setThreadFactory * @see #createThread * @see java.lang.Thread#start() */ //判断是否有工厂,没有的话调用父类创建线程 protected void doExecute(Runnable task) { Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task)); thread.start(); } 这里并不是用线程池,而是直接创建新的线程,所以会大量创建线程导致OOM。其实这个类是可以通过setConcurrencyLimit设置最大线程数,通过synchronized和wati and notify去进行限流,这里不展开讲。所以结论是在使用@Async一定要设置线程池。 @Async异步失效以下代码已做脱敏处理 在看公司代码的时候,发现这样一段代码 public UserVO saveUser(HttpServletRequest request, String source) { String token = RequestUtils.getToken(request); String uid = checkUserLoginReturnUid(token); log.info("注册登录, token : {}, uid : {}", token, uid); //获取用户信息 User User = getLoginUser(uid); if(User == null){ User = new User(); //获取用户信息 Map userMap = redisTemplateMain.getUserMapByToken(token); //保存用户 saveUser(User, userMap, source); sendUserSystem(Integer.valueOf(userMap.get("id"))); } //用户信息放进缓存 setAuth2Redis(User); return setUser2Redis(User); } //通知用户系统,我们这边成功注册了一个用户 @Async public void sendUserSystem(Integer userId){ Map map = new HashMap(); map.put("mainUid", userId); map.put("source", ""); String json = HttpUtil.post(property.userRegisterSendSystem, map); log.info("sendUserSystem userId : {}, json : {}", userId, json); }在之前我们看源码的时候已经知道了,由于@Async的AdviceMode默认为PROXY,所以当调用方和被调用方是在同一个类中,无法产生切面,@Async没有被Spring容器管理。 所以这个方法跑了这么久一直是同步。 我们可以写一个方法去测试一下。 public void asyncInvalid() { try { log.info("service start"); asyncInvalidExample(); log.info("service end"); }catch (InterruptedException e){ e.printStackTrace(); } } @Async public void asyncInvalidExample() throws InterruptedException{ Thread.sleep(10); log.info(Thread.currentThread().getName()+":处理完成"); }调用结果很明显,没有进行异步操作,而是同步。 既然线程池都已一个缓冲队列来保存未被消费的任务,那么就一定存在队列被塞满,导致线程丢失的情况。我们写一段代码模拟一下。 配置文件 spring: task: execution: pool: # 最大线程数 max-size: 16 # 核心线程数 core-size: 16 # 存活时间 keep-alive: 10s # 队列大小 queue-capacity: 100 # 是否允许核心线程超时 allow-core-thread-timeout: true # 线程名称前缀 thread-name-prefix: async-task-异步方法 @Async public void asyncRefuseRun() throws InterruptedException { Thread.sleep(5000000); }调用方法 public void asyncRefuseRun() { for (int t = 0;t |
CopyRight 2018-2019 实验室设备网 版权所有 |